package com.shujia.flink.sink

import java.util.concurrent.TimeUnit

import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala._

object Demo1Sink {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    val ds: DataStream[String] = env.socketTextStream("master", 9999)


//    val sink: PrintSinkFunction[String] = new PrintSinkFunction[String]
    //    ds.addSink(sink)

    //    ds.print()


    //    ds.writeAsText("flink/data/sink", WriteMode.OVERWRITE)

    val sink: StreamingFileSink[String] = StreamingFileSink
      .forRowFormat(new Path("flink/data/sink"), new SimpleStringEncoder[String]("UTF-8"))
      .withRollingPolicy(
        DefaultRollingPolicy.builder()
          .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
          .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
          .withMaxPartSize(1024 * 1024 * 1024)
          .build())
      .build()


    ds.addSink(sink)

    env.execute()

  }

}
